package opmq

import (
	"net"
	_ "net/http/pprof"
	"os"
	"sync"
	"time"
)

var Broker *Server

// Server mqtt服务端
type Server struct {
	vueMux sync.Mutex // 前端试图锁
	// 这个连接主要是给前端人员看的
	// 只有在连接刚发送连接报文才有意义,此时才有cid
	ClientConns []*clientConn

	Auth *Auth

	MqttPort string // 本身的mqtt服务端口

	subs *subscription

	l net.Listener

	HttpServer *HttpServer

	// 集群
	ClusterSelf *ClusterSelf
}

func NewServer(l net.Listener, opts ...Options) *Server {
	s := &Server{
		l:           l,
		ClientConns: make([]*clientConn, 0),
		ClusterSelf: NewClusterSelf(),
	}
	s.subs = NewSubscription(s)

	for _, opt := range opts {
		opt(s)
	}

	return s
}

func (s *Server) Serve() {

	os.Mkdir("topicFlag", os.ModePerm)

	go func() {
		for {
			conn, err := s.l.Accept()
			if err != nil {
				ERROR.Println(err)
			}

			// todo: 处理的不好
			NewClientConn("", conn, s)
		}
	}()

	go s.Grpc()

	// 集群操作
	go func() {
		for {
			s.ClusterSelf.PushAndPull()
			time.Sleep(time.Second * 6)
		}
	}()

	WaitSignal(s)
}

func (s *Server) GetAllClientData() []*ClientsProperty {
	res := make([]*ClientsProperty, 0)
	for _, v := range s.ClientConns {
		v.ClientsProperty.MqueueLen = int64(len(v.job)) // 更新消息丢雷长度，这里只是计算一次，放置cpu浪费
		res = append(res, v.ClientsProperty)
	}

	return res
}

// AddOrUpdateClientConn 前端试图连接添加
// 连接报文可能发送多次，所以需要更新试图，但是连接的地址的指针不变，更新也无所谓
func (s *Server) AddOrUpdateClientConn(conn *clientConn) {
	s.vueMux.Lock()
	defer s.vueMux.Unlock()
	if conn.Cid == "" {
		DEBUG.Println("连接cid为空")
		return
	}

	isHave := false
	for k, v := range s.ClientConns {
		if v.Cid == conn.Cid {
			s.ClientConns[k] = conn
			isHave = true
			break
		}
	}

	if !isHave {
		s.ClientConns = append(s.ClientConns, conn)
	}
}

// GetUniqueSublist 获取所有不重名的订阅topic名字
func (s *Server) GetUniqueSublist() []string {
	data := make([]string, 0)

	m := make(map[string]int)

	for _, v := range s.ClientConns {
		for _, v2 := range v.SubList {
			_, ok := m[v2]
			if ok {
				continue
			}
			m[v2] = 0

			data = append(data, v2)
		}
	}

	return data
}
